package cn.doit.formatChange


import java.util.Properties

import com.alibaba.fastjson.JSON
import com.typesafe.config.ConfigFactory
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by ZHAOXUHUA 
  * .           on 2018/11/26.
  */
object Bz2toParque {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    //操作DataFrame必须有一个SQLContext的实例
    val sqlContext = new SQLContext(sc)
    //读文件
    val file = sc.textFile("D:/学习专用/项目/项目四资料/DAY09/2016-10-01_06_p1_invalid.1475274123982.log.FINISH")
    //过滤的数据
    val filterData = file.map(t => t.split(",", -1)).filter(t => t.size >= 85)
    import cn.doit.richString.RichString._
    //转成row
    val rowRdd = filterData.map(arr => Row(
      arr(0),
      arr(1).toIntPlus,
      arr(2).toIntPlus,
      arr(3).toIntPlus,
      arr(4).toIntPlus,
      arr(5),
      arr(6),
      arr(7).toIntPlus,
      arr(8).toIntPlus,
      arr(9).toDoublePlus,
      arr(10).toDoublePlus,
      arr(11),
      arr(12),
      arr(13),
      arr(14),
      arr(15),
      arr(16),
      arr(17).toIntPlus,
      arr(18),
      arr(19),
      arr(20).toIntPlus,
      arr(21).toIntPlus,
      arr(22),
      arr(23),
      arr(24),
      arr(25),
      arr(26).toIntPlus,
      arr(27),
      arr(28).toIntPlus,
      arr(29),
      arr(30).toIntPlus,
      arr(31).toIntPlus,
      arr(32).toIntPlus,
      arr(33),
      arr(34).toIntPlus,
      arr(35).toIntPlus,
      arr(36).toIntPlus,
      arr(37),
      arr(38).toIntPlus,
      arr(39).toIntPlus,
      arr(40).toDoublePlus,
      arr(41).toDoublePlus,
      arr(42).toIntPlus,
      arr(43),
      arr(44).toDoublePlus,
      arr(45).toDoublePlus,
      arr(46),
      arr(47),
      arr(48),
      arr(49),
      arr(50),
      arr(51),
      arr(52),
      arr(53),
      arr(54),
      arr(55),
      arr(56),
      arr(57).toIntPlus,
      arr(58).toDoublePlus,
      arr(59).toIntPlus,
      arr(60).toIntPlus,
      arr(61),
      arr(62),
      arr(63),
      arr(64),
      arr(65),
      arr(66),
      arr(67),
      arr(68),
      arr(69),
      arr(70),
      arr(71),
      arr(72),
      arr(73).toIntPlus,
      arr(74).toDoublePlus,
      arr(75).toDoublePlus,
      arr(76).toDoublePlus,
      arr(77).toDoublePlus,
      arr(78).toDoublePlus,
      arr(79),
      arr(80),
      arr(81),
      arr(82),
      arr(83),
      arr(84).toIntPlus
    ))

    val schema = StructType(List(
      StructField("sessionid", StringType),
      StructField("advertisersid", IntegerType),
      StructField("adorderid", IntegerType),
      StructField("adcreativeid", IntegerType),
      StructField("adplatformproviderid", IntegerType),
      StructField("sdkversion", StringType),
      StructField("adplatformkey", StringType),
      StructField("putinmodeltype", IntegerType),
      StructField("requestmode", IntegerType),
      StructField("adprice", DoubleType),
      StructField("adppprice", DoubleType),
      StructField("requestdate", StringType),
      StructField("ip", StringType),
      StructField("appid", StringType),
      StructField("appname", StringType),
      StructField("uuid", StringType),
      StructField("device", StringType),
      StructField("client", IntegerType),
      StructField("osversion", StringType),
      StructField("density", StringType),
      StructField("pw", IntegerType),
      StructField("ph", IntegerType),
      StructField("long", StringType),
      StructField("lat", StringType),
      StructField("provincename", StringType),
      StructField("cityname", StringType),
      StructField("ispid", IntegerType),
      StructField("ispname", StringType),
      StructField("networkmannerid", IntegerType),
      StructField("networkmannername", StringType),
      StructField("iseffective", IntegerType),
      StructField("isbilling", IntegerType),
      StructField("adspacetype", IntegerType),
      StructField("adspacetypename", StringType),
      StructField("devicetype", IntegerType),
      StructField("processnode", IntegerType),
      StructField("apptype", IntegerType),
      StructField("district", StringType),
      StructField("paymode", IntegerType),
      StructField("isbid", IntegerType),
      StructField("bidprice", DoubleType),
      StructField("winprice", DoubleType),
      StructField("iswin", IntegerType),
      StructField("cur", StringType),
      StructField("rate", DoubleType),
      StructField("cnywinprice", DoubleType),
      StructField("imei", StringType),
      StructField("mac", StringType),
      StructField("idfa", StringType),
      StructField("openudid", StringType),
      StructField("androidid", StringType),
      StructField("rtbprovince", StringType),
      StructField("rtbcity", StringType),
      StructField("rtbdistrict", StringType),
      StructField("rtbstreet", StringType),
      StructField("storeurl", StringType),
      StructField("realip", StringType),
      StructField("isqualityapp", IntegerType),
      StructField("bidfloor", DoubleType),
      StructField("aw", IntegerType),
      StructField("ah", IntegerType),
      StructField("imeimd5", StringType),
      StructField("macmd5", StringType),
      StructField("idfamd5", StringType),
      StructField("openudidmd5", StringType),
      StructField("androididmd5", StringType),
      StructField("imeisha1", StringType),
      StructField("macsha1", StringType),
      StructField("idfasha1", StringType),
      StructField("openudidsha1", StringType),
      StructField("androididsha1", StringType),
      StructField("uuidunknow", StringType),
      StructField("userid", StringType),
      StructField("iptype", IntegerType),
      StructField("initbidprice", DoubleType),
      StructField("adpayment", DoubleType),
      StructField("agentrate", DoubleType),
      StructField("lrate", DoubleType),
      StructField("adxrate", DoubleType),
      StructField("title", StringType),
      StructField("keywords", StringType),
      StructField("tagid", StringType),
      StructField("callbackdate", StringType),
      StructField("channelid", StringType),
      StructField("mediatype", IntegerType)
    ))
    val frame: DataFrame = sqlContext.createDataFrame(rowRdd, schema)
   // frame.write.parquet("D:/学习专用/项目/项目四资料/data/dophinData")

    val file1 = sqlContext.read.parquet("D:/学习专用/项目/项目四资料/data/dophinData")
    //sparksql
    import sqlContext.implicits._
    //把DataFrame注册成一张临时表
    file1.registerTempTable("ad_analys")
    val sql = sqlContext.sql(
      """
        			  |select substring(requestdate, 0, 10) date, provincename, cityname, count(1) cnt
        			  |from ad_analys
        			  |group by provincename,cityname, substring(requestdate, 0, 10)
      			""".stripMargin)
    sql.show()
    //写入到mysql中
    val config = ConfigFactory.load()
    val url = config.getString("dolphin.mysql.url")
    val conn = new Properties()
    conn.setProperty("user", config.getString("dolphin.mysql.username"))
    conn.setProperty("password", config.getString("dolphin.mysql.password"))
    conn.setProperty("driver", config.getString("dolphin.mysql.driver"))
    //    sql.write.jdbc(url, "dolphin_table1", conn)
    sql.write.jdbc(url, "t_dolphin_table1", conn)//写到数据库中
    sql.write.json("data.json")
    sc.stop()
  }
}
